Solutions/MimecastSEG/Data Connectors/TransformData/siem_parser.py (146 lines of code) (raw):

from ..Helpers.date_helper import DateHelper from ..Models.Enum.siem_types import SiemTypes import logging class SiemParser: def __init__(self): self.date_helper = DateHelper() def parse(self, logs): parsed_logs = [] if logs: for log in logs: if 'checkpoints' in log: continue log_type = log['logType'].strip() if log_type == SiemTypes.TYPE_AV: parsed_logs.append(self.parse_av_event(log)) elif log_type == SiemTypes.TYPE_DELIVERY: parsed_logs.append(self.parse_delivery_event(log)) elif log_type == SiemTypes.TYPE_PROCESS: parsed_logs.append(self.parse_process_event(log)) elif log_type == SiemTypes.TYPE_RECEIPT: parsed_logs.append(self.parse_receipt_event(log)) elif log_type == SiemTypes.TYPE_TTP_URL: parsed_logs.append(self.parse_ttp_url_event(log)) elif log_type == SiemTypes.TYPE_TTP_IMPERSONATION: parsed_logs.append(self.parse_ttp_impersonation_event(log)) elif log_type == SiemTypes.TYPE_TTP_ATTACHMENT: parsed_logs.append(self.parse_ttp_attachment_event(log)) elif log_type == SiemTypes.TYPE_TTP_IEP: parsed_logs.append(self.parse_ttp_iep_event(log)) elif log_type == SiemTypes.TYPE_JOURNAL: parsed_logs.append(self.parse_journal_event(log)) elif log_type == SiemTypes.TYPE_SPAMEVENTTHREAD: parsed_logs.append(self.parse_spameventthread(log)) else: parsed_logs.append(self.parse_other_event(log)) return parsed_logs def parse_av_event(self, event): """ Parse a single AV event :param event: event to be parsed (single line in log) :return: parsed event """ category = 'mail_av' event_id = 'mail_av' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_delivery_event(self, event): """ Parse a single Delivery event Based on: - Delivered (is the mail delivered at all) - UseTls (was tls used) :param event: event to be parsed (single line in log) :return: parsed event """ delivered = event['Delivered'] if 'Delivered' in event else None use_tls = event['UseTls'] if 'UseTls' in event else None if delivered is not None: if delivered == 'true': if use_tls == 'Yes': event_id = 'mail_delivery_delivered' else: event_id = 'mail_delivery_delivered_notls' else: event_id = 'mail_delivery_not_delivered' else: event_id = 'mail_delivery_other' category = 'mail_delivery' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_process_event(self, event): """ Parse a single Process event Based on: - Act (action) :param event: event to be parsed (single line in log) :return: parsed event """ action = event['Act'] if 'Act' in event else None if action == 'Acc': event_id = 'mail_process_accepted' elif action == 'Hld': event_id = 'mail_process_held' elif action == 'Sdbx': event_id = 'mail_process_sandboxed' elif action == 'Rty': event_id = 'mail_process_retried' elif action == 'Bnc': event_id = 'mail_process_bounced' else: event_id = 'mail_process_other' category = 'mail_process' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_receipt_event(self, event): """ Parse a single Receipt event Based on: - Act (action) - TlsVer (TLS version) - Virus (was there a virus in a mail) - SpamInfo (is mail a spam) :param event: event to be parsed (single line in log) :return: parsed event """ action = event['Act'] if 'Act' in event else None tls_version = event['TlsVer'] if 'TlsVer' in event else None is_virus = True if 'Virus' in event else False is_spam = False if 'SpamInfo' not in event or event['SpamInfo'] == '[]' else True if is_virus: event_id = 'mail_receipt_virus' elif is_spam: event_id = 'mail_receipt_spam' elif action == 'Rej': event_id = 'mail_receipt_rejected' elif action == 'Ign': event_id = 'mail_receipt_ignored' elif action == 'Bnc': event_id = 'mail_receipt_bounced' elif tls_version is not None and tls_version.startswith('TLSv1'): event_id = 'mail_receipt_received' elif action == 'Acc' and (tls_version is None or not tls_version.startswith('TLSv1')): event_id = 'mail_receipt_received_notls' else: event_id = 'mail_receipt_other' category = 'mail_receipt' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_ttp_url_event(self, event): """ Parse a single TTP URL event :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_ttp_url' category = 'mail_ttp_url' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_ttp_impersonation_event(self, event): """ Parse a single TTP Impersonation event :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_ttp_impersonation' category = 'mail_ttp_impersonation' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_ttp_attachment_event(self, event): """ Parse a single TTP Attachment event :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_ttp_attachment' category = 'mail_ttp_attachment' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_ttp_iep_event(self, event): """ Parse a single TTP IEP event :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_ttp_iep' category = 'mail_ttp_iep' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_journal_event(self, event): """Parse a single Journaling event. :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_journal' category = 'mail_journal' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_spameventthread(self, event): """Parse a single Spameventthread event. :param event: event to be parsed (single line in log) :return: parsed event """ event_id = 'mail_spameventthread' category = 'mail_spameventthread' timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event def parse_other_event(self, event): """ Parse a single event that we don't expect :param event: event to be parsed (single line in log) :event_type: name of event type from response header :return: parsed event as unicode """ event_id = 'other_{0}'.format(event['logType']) category = 'other' logging.warning('Unexpected log type: "{0}"'.format(event['logType'])) timestamp = self.date_helper.convert_from_mimecast_format(event['datetime']) event.update({'mimecastEventId': event_id, 'mimecastEventCategory': category, 'time_generated': timestamp}) return event